% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-

%% A dreyfus_index gen_server is linked to its clouseau twin.

-module(dreyfus_index).
-behaviour(gen_server).
-include_lib("couch/include/couch_db.hrl").
-include("dreyfus.hrl").

% public api.
-export([
    start_link/2,
    design_doc_to_index/2,
    await/2,
    search/2,
    info/1,
    group1/2,
    group2/2,
    design_doc_to_indexes/1
]).

% gen_server api.
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2
]).

% private definitions.
-record(state, {
    dbname,
    index,
    updater_pid = nil,
    index_pid = nil,
    waiting_list = []
}).

% exported for callback.
-export([search_int/2, group1_int/2, group2_int/2, info_int/1]).

% public functions.
start_link(DbName, Index) ->
    proc_lib:start_link(?MODULE, init, [{DbName, Index}]).

await(Pid, MinSeq) ->
    MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]},
    dreyfus_util:time([index, await], MFA).

search(Pid0, QueryArgs) ->
    Pid = to_index_pid(Pid0),
    MFA = {?MODULE, search_int, [Pid, QueryArgs]},
    dreyfus_util:time([index, search], MFA).

group1(Pid0, QueryArgs) ->
    Pid = to_index_pid(Pid0),
    MFA = {?MODULE, group1_int, [Pid, QueryArgs]},
    dreyfus_util:time([index, group1], MFA).

group2(Pid0, QueryArgs) ->
    Pid = to_index_pid(Pid0),
    MFA = {?MODULE, group2_int, [Pid, QueryArgs]},
    dreyfus_util:time([index, group2], MFA).

info(Pid0) ->
    Pid = to_index_pid(Pid0),
    MFA = {?MODULE, info_int, [Pid]},
    dreyfus_util:time([index, info], MFA).

%% We either have a dreyfus_index gen_server pid or the remote
%% clouseau pid.
to_index_pid(Pid) ->
    case node(Pid) == node() of
        true -> gen_server:call(Pid, get_index_pid, infinity);
        false -> Pid
    end.

design_doc_to_indexes(#doc{body = {Fields}} = Doc) ->
    RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
    case RawIndexes of
        {IndexList} when is_list(IndexList) ->
            {IndexNames, _} = lists:unzip(IndexList),
            lists:flatmap(
                fun(IndexName) ->
                    case (catch design_doc_to_index(Doc, IndexName)) of
                        {ok, #index{} = Index} -> [Index];
                        _ -> []
                    end
                end,
                IndexNames
            );
        _ ->
            []
    end.

% gen_server functions.

init({DbName, Index}) ->
    process_flag(trap_exit, true),
    case open_index(DbName, Index) of
        {ok, Pid, Seq} ->
            State = #state{
                dbname = DbName,
                index = Index#index{current_seq = Seq, dbname = DbName},
                index_pid = Pid
            },
            case couch_db:open_int(DbName, []) of
                {ok, Db} ->
                    try
                        couch_db:monitor(Db)
                    after
                        couch_db:close(Db)
                    end,
                    dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index),
                    proc_lib:init_ack({ok, self()}),
                    gen_server:enter_loop(?MODULE, [], State);
                Error ->
                    proc_lib:init_ack(Error)
            end;
        Error ->
            proc_lib:init_ack(Error)
    end.

handle_call(
    {await, RequestSeq},
    From,
    #state{
        index =
            #index{dbname = DbName, name = IdxName, ddoc_id = DDocId, current_seq = Seq} = Index,
        index_pid = IndexPid,
        updater_pid = nil,
        waiting_list = WaitList
    } = State
) when RequestSeq > Seq ->
    DbName2 = mem3:dbname(DbName),
    <<"_design/", GroupId/binary>> = DDocId,
    NewState =
        case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
            false ->
                UpPid = spawn_link(fun() ->
                    dreyfus_index_updater:update(IndexPid, Index)
                end),
                State#state{
                    updater_pid = UpPid,
                    waiting_list = [{From, RequestSeq} | WaitList]
                };
            _ ->
                couch_log:notice(
                    "Index Blocked from Updating - db: ~p,"
                    " ddocid: ~p name: ~p",
                    [DbName, DDocId, IdxName]
                ),
                State
        end,
    {noreply, NewState};
handle_call(
    {await, RequestSeq},
    _From,
    #state{index = #index{current_seq = Seq}} = State
) when RequestSeq =< Seq ->
    {reply, {ok, State#state.index_pid, Seq}, State};
handle_call({await, RequestSeq}, From, #state{waiting_list = WaitList} = State) ->
    {noreply, State#state{
        waiting_list = [{From, RequestSeq} | WaitList]
    }};
% upgrade
handle_call(get_index_pid, _From, State) ->
    {reply, State#state.index_pid, State};
% obsolete
handle_call({search, QueryArgs0}, _From, State) ->
    Reply = search_int(State#state.index_pid, QueryArgs0),
    {reply, Reply, State};
% obsolete
handle_call({group1, QueryArgs0}, _From, State) ->
    Reply = group1_int(State#state.index_pid, QueryArgs0),
    {reply, Reply, State};
% obsolete
handle_call({group2, QueryArgs0}, _From, State) ->
    Reply = group2_int(State#state.index_pid, QueryArgs0),
    {reply, Reply, State};
% obsolete
handle_call(info, _From, State) ->
    Reply = info_int(State#state.index_pid),
    {reply, Reply, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(
    {'EXIT', FromPid, {updated, NewSeq}},
    #state{
        index = #index{dbname = DbName, name = IdxName, ddoc_id = DDocId} = Index0,
        index_pid = IndexPid,
        updater_pid = UpPid,
        waiting_list = WaitList
    } = State
) when UpPid == FromPid ->
    Index = Index0#index{current_seq = NewSeq},
    case reply_with_index(IndexPid, Index, WaitList) of
        [] ->
            {noreply, State#state{
                index = Index,
                updater_pid = nil,
                waiting_list = []
            }};
        StillWaiting ->
            DbName2 = mem3:dbname(DbName),
            <<"_design/", GroupId/binary>> = DDocId,
            Pid =
                case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
                    true ->
                        couch_log:notice(
                            "Index Blocked from Updating - db: ~p, ddocid: ~p"
                            " name: ~p",
                            [DbName, GroupId, IdxName]
                        ),
                        nil;
                    false ->
                        spawn_link(fun() ->
                            dreyfus_index_updater:update(IndexPid, Index)
                        end)
                end,
            {noreply, State#state{
                index = Index,
                updater_pid = Pid,
                waiting_list = StillWaiting
            }}
    end;
handle_info({'EXIT', _, {updated, _}}, State) ->
    {noreply, State};
handle_info(
    {'EXIT', FromPid, Reason},
    #state{
        index = Index,
        index_pid = IndexPid,
        waiting_list = WaitList
    } = State
) when FromPid == IndexPid ->
    couch_log:notice(
        "index for ~p closed with reason ~p", [index_name(Index), Reason]
    ),
    [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
    {stop, normal, State};
handle_info(
    {'EXIT', FromPid, Reason},
    #state{
        index = Index,
        updater_pid = UpPid,
        waiting_list = WaitList
    } = State
) when FromPid == UpPid ->
    couch_log:info(
        "Shutting down index server ~p, updater ~p closing w/ reason ~w",
        [index_name(Index), UpPid, Reason]
    ),
    [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
    {stop, normal, State};
handle_info({'EXIT', Pid, Reason}, State) ->
    % probably dreyfus_index_manager.
    couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]),
    {stop, normal, State};
handle_info(
    {'DOWN', _, _, Pid, Reason},
    #state{
        index = Index,
        waiting_list = WaitList
    } = State
) ->
    couch_log:info(
        "Shutting down index server ~p, db ~p closing w/ reason ~w",
        [index_name(Index), Pid, Reason]
    ),
    [gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
    {stop, normal, State}.

% private functions.

open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) ->
    Path = <<DbName/binary, "/", Sig/binary>>,
    case clouseau_rpc:open_index(self(), Path, Analyzer) of
        {ok, Pid} ->
            case clouseau_rpc:get_update_seq(Pid) of
                {ok, Seq} ->
                    {ok, Pid, Seq};
                Error ->
                    Error
            end;
        Error ->
            Error
    end.

design_doc_to_index(#doc{id = Id, body = {Fields}}, IndexName) ->
    Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
    {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
    InvalidDDocError =
        {invalid_design_doc, <<"index `", IndexName/binary, "` must have parameter `index`">>},
    case lists:keyfind(IndexName, 1, RawIndexes) of
        false ->
            {error, {not_found, <<IndexName/binary, " not found.">>}};
        {IndexName, {Index}} ->
            Analyzer = couch_util:get_value(<<"analyzer">>, Index, <<"standard">>),
            case couch_util:get_value(<<"index">>, Index) of
                undefined ->
                    {error, InvalidDDocError};
                Def ->
                    Sig = ?l2b(
                        couch_util:to_hex(
                            couch_hash:md5_hash(
                                ?term_to_bin({Analyzer, Def})
                            )
                        )
                    ),
                    {ok, #index{
                        analyzer = Analyzer,
                        ddoc_id = Id,
                        def = Def,
                        def_lang = Language,
                        name = IndexName,
                        sig = Sig
                    }}
            end;
        _ ->
            {error, InvalidDDocError}
    end.

reply_with_index(IndexPid, Index, WaitList) ->
    reply_with_index(IndexPid, Index, WaitList, []).

reply_with_index(_IndexPid, _Index, [], Acc) ->
    Acc;
reply_with_index(IndexPid, #index{current_seq = IndexSeq} = Index, [{Pid, Seq} | Rest], Acc) when
    Seq =< IndexSeq
->
    gen_server:reply(Pid, {ok, IndexPid, IndexSeq}),
    reply_with_index(IndexPid, Index, Rest, Acc);
reply_with_index(IndexPid, Index, [{Pid, Seq} | Rest], Acc) ->
    reply_with_index(IndexPid, Index, Rest, [{Pid, Seq} | Acc]).

index_name(#index{dbname = DbName, ddoc_id = DDocId, name = IndexName}) ->
    <<DbName/binary, " ", DDocId/binary, " ", IndexName/binary>>.

args_to_proplist(#index_query_args{} = Args) ->
    [
        {query, Args#index_query_args.q},
        {partition, Args#index_query_args.partition},
        {limit, Args#index_query_args.limit},
        {refresh, Args#index_query_args.stale =:= false},
        {'after', Args#index_query_args.bookmark},
        {sort, Args#index_query_args.sort},
        {include_fields, Args#index_query_args.include_fields},
        {counts, Args#index_query_args.counts},
        {ranges, Args#index_query_args.ranges},
        {drilldown, Args#index_query_args.drilldown},
        {highlight_fields, Args#index_query_args.highlight_fields},
        {highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
        {highlight_post_tag, Args#index_query_args.highlight_post_tag},
        {highlight_number, Args#index_query_args.highlight_number},
        {highlight_size, Args#index_query_args.highlight_size}
    ].

args_to_proplist2(#index_query_args{} = Args) ->
    [
        {query, Args#index_query_args.q},
        {field, Args#index_query_args.grouping#grouping.by},
        {refresh, Args#index_query_args.stale =:= false},
        {groups, Args#index_query_args.grouping#grouping.groups},
        {group_sort, Args#index_query_args.grouping#grouping.sort},
        {sort, Args#index_query_args.sort},
        {limit, Args#index_query_args.limit},
        {include_fields, Args#index_query_args.include_fields},
        {highlight_fields, Args#index_query_args.highlight_fields},
        {highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
        {highlight_post_tag, Args#index_query_args.highlight_post_tag},
        {highlight_number, Args#index_query_args.highlight_number},
        {highlight_size, Args#index_query_args.highlight_size}
    ].

search_int(Pid, QueryArgs0) ->
    QueryArgs = dreyfus_util:upgrade(QueryArgs0),
    Props = args_to_proplist(QueryArgs),
    clouseau_rpc:search(Pid, Props).

group1_int(Pid, QueryArgs0) ->
    QueryArgs = dreyfus_util:upgrade(QueryArgs0),
    #index_query_args{
        q = Query,
        stale = Stale,
        grouping = #grouping{
            by = GroupBy,
            offset = Offset,
            limit = Limit,
            sort = Sort
        }
    } = QueryArgs,
    clouseau_rpc:group1(
        Pid,
        Query,
        GroupBy,
        Stale =:= false,
        Sort,
        Offset,
        Limit
    ).

group2_int(Pid, QueryArgs0) ->
    QueryArgs = dreyfus_util:upgrade(QueryArgs0),
    Props = args_to_proplist2(QueryArgs),
    clouseau_rpc:group2(Pid, Props).

info_int(Pid) ->
    clouseau_rpc:info(Pid).
